package com.lhczf.lucenedb.threads;

import com.jsoniter.JsonIterator;
import com.jsoniter.any.Any;
import com.lhczf.lucenedb.service.DataHub;
import com.lhczf.lucenedb.service.LuceneDbServer;
import com.lhczf.lucenedb.service.ServerContext;
import com.lhczf.lucenedb.util.FileUtils;
import com.lhczf.lucenedb.util.SpringUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.document.*;
import org.apache.lucene.facet.FacetField;
import org.apache.lucene.facet.FacetsConfig;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;

import java.io.*;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class ConsumerThread extends Thread {

    private static final String STRING_INDEX = "string";
    private static final String DOUBLE_INDEX = "double";
    private IndexWriter indexWriter;
    private DirectoryTaxonomyWriter taxoWriter;
    private int contexIndex = 0;
    @Setter
    @Getter
    private String curentSubDir;
    private String subTodayDir = LocalDateTime.now().format(LuceneDbServer.Y_M_D_DTF);
    private DataHub dataHub;
    private LuceneDbServer luceneDbServer;

    public ConsumerThread(String name, IndexWriter writer) {
        super(name);
        this.curentSubDir = name.substring(name.lastIndexOf('-') + 1);
        this.indexWriter = writer;
    }

    @Override
    public void run() {
        log.info("当前线程的索引目录为：{}", indexWriter.getDirectory().toString());
        ServerContext serverContext = SpringUtil.getBean(ServerContext.class);
        this.taxoWriter = serverContext.getTaxoWriter();
        this.dataHub = SpringUtil.getBean(DataHub.class);
        this.luceneDbServer = SpringUtil.getBean(LuceneDbServer.class);
        createIndexs();
    }

    private void createIndexs() {
        for (; ; ) {
            String filePath = null;
            try {
                filePath = DataHub.dataQueue.take();
                changedIndexDir();
            } catch (InterruptedException e) {
                log.error("", e);
                Thread.currentThread().interrupt();
            }
            if (!SpringUtil.haveLength(filePath)) {
                continue;
            }
            File dataFile = new File(filePath);
            try {
                indexDoc(indexWriter, dataFile);
                indexWriter.commit();
                taxoWriter.commit();
            } catch (IOException e) {
                log.error("索引提交过程中出现了异常，保存目录为{}", e);
            }
            FileUtils.deleteFile(dataFile);
            changedIndexDir();
        }
    }

    private void changedIndexDir() {
        String currentStr = LocalDateTime.now().format(LuceneDbServer.Y_M_D_DTF);
        if (!currentStr.equals(subTodayDir)) {
            log.info("当前线程在文件处理完之后索引目录发生了切换");
            String threadName = Thread.currentThread().getName();
            closeWriter(indexWriter);

            String dir = currentStr + File.separator + curentSubDir;
            indexWriter = luceneDbServer.changeDir(threadName, dir);
            subTodayDir = currentStr;
            String yesterdayStr = LocalDateTime.now().minusDays(1).format(LuceneDbServer.Y_M_D_DTF);
            dataHub.putMessageData(yesterdayStr + File.separator + curentSubDir);
        }
    }

    private void closeWriter(IndexWriter indexWriter) {
        try {
            indexWriter.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    private int indexDoc(IndexWriter writer, File file) {
        return commFileDeal(writer, file);
    }

    private int commFileDeal(IndexWriter writer, File file) {
        int total = 0;
        BufferedReader in = null;
        String fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield");
        String[] names = fieldConfig.split(",");

        Map<String, String[]> dataType = getConfIgOfField();
        LocalDateTime start = LocalDateTime.now();
        FacetsConfig facetsConfig = new FacetsConfig();

        try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file))) {
            in = new BufferedReader(new InputStreamReader(bis, "utf-8"), 1 * 1024 * 1024);
            while (in.ready()) {
                String content = in.readLine().trim();
                if ("".equals(content)) {
                    continue;
                }
                // TODO 入库开启下面的注释
//                System.out.println("=============================================start");
//                System.out.println("ConsumerThread.commFileDeal 读到的语句"+content);
                Document doc = dealDataByJson(names, dataType, content);
                if (doc == null) {
                    continue;
                }
                Document facetsDoc = facetsConfig.build(taxoWriter, doc);
                writer.addDocument(facetsDoc);
                total++;
//                System.out.println(total);
//                System.out.println("==============================================end");
                contexIndex++;
            }
            contexIndex = 0;
        } catch (FileNotFoundException e) {
            log.error("文件不存在：{}", file.getAbsolutePath());
        } catch (IOException e) {
            log.error("文件操作过程中出现了IO异常。", e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    log.error("关闭文件流时出错。", e);
                }
            }
        }
        if (dataHub.isPerformanceDebug()) {
            LocalDateTime end = LocalDateTime.now();
            Duration duration = Duration.between(start, end);
            long times = duration.toMillis();
            long speed;
            if (times / 1000 == 0) {
                speed = total;
            } else {
                speed = total / (times / 1000);
            }
            log.info("{}条记录处理耗时：{}毫秒， {}条每秒", total, times, speed);
        }
        return total;
    }

    private Document dealDataByJson(String[] names, Map<String, String[]> dataType, String content) {
        Any any = deserializeFile(content);
        if (any == null) {
            return null;
        }
        return createDoc(names, dataType, any);
    }

    private Any deserializeFile(String content) {
        Any any;
        try {
            any = JsonIterator.deserialize(content);
            if (any == null) {
                return null;
            }
        } catch (Exception e) {
            log.error("在转化json的过程中出现异常，文件中的位置是第{}行，语句为：{}", contexIndex, content, e);
            return null;
        }
        return any;
    }

    private Document createDoc(String[] names, Map<String, String[]> dataType, Any any) {
        Document doc = new Document();
//        System.out.println("names:[" + names + "]");
        dataType.keySet().forEach(key -> {
            switch (key) {
                case "int":
                    createIntField(doc, names, any, dataType);
                    break;
                case "long":
                    createLongField(doc, names, any, dataType);
                    break;
                case DOUBLE_INDEX:
                    createDoubleField(doc, names, any, dataType);
                    break;
                case "text":
                    createTextField(doc, names, any, dataType);
                    break;
                case STRING_INDEX:
                    createStringField(doc, names, any, dataType);
                    break;
                default:
                    break;
            }
        });
        return doc;
    }

    private void createTextField(Document doc, String[] names, Any values, Map<String, String[]> dataType) {
        String[] textConfig = dataType.get("text");
        for (String config : textConfig) {
            int index = Integer.parseInt(config);
            String fieldName = names[index];
            String fieldValue = values.get(fieldName).toString();

            Field field = new TextField(fieldName, fieldValue, Field.Store.YES);
            doc.add(field);
        }
    }

    private void makeFacetField(Document doc, String fieldName, String fieldValue, Map<String, String[]> dataType) {
        String[] facets = dataType.get("facet");
        for (String name : facets) {
            if (fieldName.equals(name)) {
//                System.out.println("切面字段|||||" + fieldName);
                doc.add(new FacetField(fieldName, fieldValue));
            }
        }
    }

    private void createStringField(Document doc, String[] names, Any values, Map<String, String[]> dataType) {
        String[] stringConfig = dataType.get(STRING_INDEX);
        for (String config : stringConfig) {
            int index = Integer.parseInt(config);
            String fieldName = names[index];
            String fieldValue = values.get(fieldName).toString();

            Field field = new StringField(fieldName, fieldValue, Field.Store.YES);
            doc.add(field);
            makeFacetField(doc, fieldName, fieldValue, dataType);
            String[] configs = dataType.get("docvalue");
            for (String name : configs) {
                if (fieldName.equals(name)) {
                    doc.add(new SortedDocValuesField(fieldName, new BytesRef(fieldValue)));
                }
            }
        }
    }

    private void createDoubleField(Document doc, String[] names, Any values, Map<String, String[]> dataType) {
        String[] doubleConfig = dataType.get(DOUBLE_INDEX);
        for (String config : doubleConfig) {
            int index = Integer.parseInt(config);
            String fieldName = names[index];
            String fieldValue = values.get(fieldName).toString();

            Double value = Double.parseDouble(fieldValue);
            Field field = new DoublePoint(fieldName, value);
            doc.add(new StoredField(fieldName, value));
            makeDocValue(doc, dataType, fieldName, fieldValue, field);
        }
    }

    private boolean hasCreate(Map<String, String[]> dataType, String fieldName) {
        String[] configs = dataType.get("excnumeric");
        for (String key : configs) {
            if (key.equals(fieldName)) {
                return false;
            }
        }
        return true;
    }

    /**
     * 排序字段
     *
     * @param dataType
     * @param fieldName
     * @return
     */
    private boolean hasOrder(Map<String, String[]> dataType, String fieldName) {
        String[] configs = dataType.get("order");
        for (String key : configs) {
            if (key.equals(fieldName)) {
                return true;
            }
        }
        return false;
    }

    private void createLongField(Document doc, String[] names, Any values, Map<String, String[]> dataType) {
        String[] config = dataType.get("long");
        for (String name : config) {
            int index = Integer.parseInt(name);
            String fieldName = names[index];
            String fieldValue = values.get(fieldName).toString();

            Long value = Long.parseLong(fieldValue);
            Field field = new LongPoint(fieldName, value);
            doc.add(new StoredField(fieldName, value));
            makeDocValue(doc, dataType, fieldName, fieldValue, field);
        }
    }

    private void makeDocValue(Document doc, Map<String, String[]> dataType, String fieldName, String fieldValue, Field field) {
        doc.add(field);
        makeFacetField(doc, fieldName, fieldValue, dataType);
        if (hasCreate(dataType, fieldName)) {
            if (hasOrder(dataType, fieldName)) {
                // 只有这种域才能排序
                doc.add(new NumericDocValuesField(fieldName, Long.valueOf(fieldValue)));
                return;
            }
            // 分组字段
//            System.out.println("分组字段-" + fieldName);
            /**
             * 分组字段-srcPort
             * 分组字段-destPort
             * 分组字段-dbType
             * 分组字段-operType
             * 分组字段-operSentenceLen
             * 分组字段-rowNum
             * 分组字段-sqlResponse
             * 分组字段-returnContentLen
             * 分组字段-dealState
             * 分组字段-ruleType
             */
            doc.add(new SortedDocValuesField(fieldName, new BytesRef(fieldValue)));
        }
    }

    private void createIntField(Document doc, String[] names, Any values, Map<String, String[]> dataType) {
        String[] intConfig = dataType.get("int");
        for (String config : intConfig) {
            int index = Integer.parseInt(config);
            String fieldName = names[index];
            String fieldValue = values.get(fieldName).toString();

            Integer value = Integer.valueOf(fieldValue);
            Field field = new IntPoint(fieldName, value);
            doc.add(new StoredField(fieldName, value));
            makeDocValue(doc, dataType, fieldName, fieldValue, field);
        }
    }

    private Map<String, String[]> getConfIgOfField() {
        Map<String, String[]> dataType = new HashMap<>(16);
        String fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield.IntPoint");
        if (fieldConfig != null) {
            String[] intConfig = fieldConfig.split(",");
            dataType.put("int", intConfig);
        }
        fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield.LongPoint");
        if (fieldConfig != null) {
            String[] longConfig = fieldConfig.split(",");
            dataType.put("long", longConfig);
        }

        fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield.DoublePoint");
        if (fieldConfig != null) {
            String[] doubleConfig = fieldConfig.split(",");
            dataType.put(DOUBLE_INDEX, doubleConfig);
        }

        fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield.StringField");
        if (fieldConfig != null) {
            String[] stringConfig = fieldConfig.split(",");
            dataType.put(STRING_INDEX, stringConfig);
        }

        fieldConfig = SpringUtil.getProperValue("system.lucenedb.datafield.TextField");
        if (fieldConfig != null) {
            String[] textConfig = fieldConfig.split(",");
            dataType.put("text", textConfig);
        }

        fieldConfig = SpringUtil.getProperValue("system.lucenedb.need.facetfield");
        if (fieldConfig != null) {
            String[] textConfig = fieldConfig.split(",");
            dataType.put("facet", textConfig);
        }

        fieldConfig = SpringUtil.getProperValue("system.lucenedb.need.string.docvalue");
        if (fieldConfig != null) {
            String[] textConfig = fieldConfig.split(",");
            dataType.put("docvalue", textConfig);
        }
        fieldConfig = SpringUtil.getProperValue("system.lucenedb.exclusions.numeric.docvalue");
        if (fieldConfig != null) {
            String[] textConfig = fieldConfig.split(",");
            dataType.put("excnumeric", textConfig);
        }
        fieldConfig = SpringUtil.getProperValue("system.lucenedb.need.order.docvalue");
        if (fieldConfig != null) {
            String[] textConfig = fieldConfig.split(",");
            dataType.put("order", textConfig);
        }
        return dataType;
    }
}
